package com.yzg.study.kafka.common.kafka;

import com.yzg.study.kafka.common.exception.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

@Slf4j
@Configuration
public class KafkaConsumerUtils<k,v> implements Closeable {

    private KafkaConsumer<k,v> consumer;

    public KafkaConsumerUtils(){
        Properties props = new Properties();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.consumer = new KafkaConsumer<>(props);
    }

    /**
     * 订阅topic的分区
     * @param partition
     */
    public KafkaConsumer<k, v> assign(TopicPartition partition) {
        this.consumer.assign(Collections.singleton(partition));
        return this.consumer;
    }

    /**
     * 订阅topic的多个分区
     * @param topicName
     */
    public KafkaConsumer<k, v> assign(String topicName) {
        List<PartitionInfo> partitionInfos = queryPartitions(topicName);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for(PartitionInfo partition : partitionInfos){
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        this.consumer.assign(topicPartitions);
        return this.consumer;
    }

    /**
     * 获取某个主题下的所有分区列表
     * @param topicName
     */
    public List<PartitionInfo> queryPartitions(String topicName){
        List<PartitionInfo> partitionInfos = this.consumer.partitionsFor(topicName);
        if(CollectionUtils.isEmpty(partitionInfos)){
            log.error("{}主题下没有分区信息", topicName);
            throw new ServiceException("{}主题下没有分区信息", topicName);
        }
        return partitionInfos;
    }

    /**
     * 返回订阅主题的一组消息，用duration控制poll()方法的阻塞时间，单位固定为毫秒
     * @param duration
     */
    public ConsumerRecords<k,v> poll(Duration duration){
        return this.consumer.poll(duration);
    }

    /**
     * 手动同步提交消费消息的偏移量
     */
    public void commit(){
        this.consumer.commitAsync();
    }

    @Override
    public void close() throws IOException {
        this.consumer.close();
    }

}
